package org.elastic.compreplatform.crawler.core.thread;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

import org.elastic.compreplatform.conf.core.core.ConfZkConf;

import com.minstone.quartz.core.util.SpringConfigurerUtil;

/**
 * ClassName: TaskThreadPoolExecutor 
 * @Description: 阻塞线程池
 * @author JornTang
 * @date 2018年1月3日
 */
public class TaskThreadPoolExecutor extends ThreadPoolExecutor {
	private final ReentrantLock taskLock = new ReentrantLock();
    private final Condition unpaused = taskLock.newCondition();
    private final int maxTaskCount;
    private volatile int currentTaskCount;
    //运行最大任务缓存数量，超过之后线程进入阻塞状态
    private static int threadMaxSize = Integer.valueOf(ConfZkConf.get("elastic_compre.thread.max.size"));
	
    //线程池维护线程的最少数量  
    private static final int COREPOOLSIZE = Integer.valueOf(ConfZkConf.get("elastic_compre.thread.corepoolsize"));  
    //线程池维护线程的最大数量  
    private static final int MAXINUMPOOLSIZE = Integer.valueOf(ConfZkConf.get("elastic_compre.thread.maxpoolsize"));   
    //线程池维护线程所允许的空闲时间  
    private static final long KEEPALIVETIME = Integer.valueOf(ConfZkConf.get("elastic_compre.thread.keeplivetime"));  ;  
    //线程池维护线程所允许的空闲时间的单位  
    private static final TimeUnit UNIT = TimeUnit.SECONDS;  
    //线程池所使用的缓冲队列,这里队列大小为  Integer.MAX_VALUE
    private static final LinkedBlockingQueue<Runnable> WORKQUEUE = new LinkedBlockingQueue<Runnable>();  
    //线程池对拒绝任务的处理策略：AbortPolicy为抛出异常；CallerRunsPolicy为重试添加当前的任务，他会自动重复调用execute()方法；DiscardOldestPolicy为抛弃旧的任务，DiscardPolicy为抛弃当前的任务  
    private static final CallerRunsPolicy HANDLER = new ThreadPoolExecutor.CallerRunsPolicy();  
    // TODO 初始化线程池  
    private static ThreadPoolExecutor threadPool= null;
    static{
    	threadPool = new TaskThreadPoolExecutor(COREPOOLSIZE, MAXINUMPOOLSIZE, KEEPALIVETIME, UNIT, WORKQUEUE, threadMaxSize); 
    }
    /**
     * @Description: 添加任务
     * @param threadTask   
     * @return void  
     * @throws
     * @author JornTang
     * @date 2017年11月30日
     */
    public static void addTaskToThreadPool(Runnable threadTask){
    	threadPool.execute(threadTask);
    }

    public TaskThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
            long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, int maxTaskCount) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
        this.maxTaskCount = maxTaskCount;
    }

    /**
     * Executes task if there is enough system resources for it. Otherwise
     * waits.
     */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        taskLock.lock();
        try {
            // Spin while we will not have enough capacity for this job
            while (maxTaskCount < currentTaskCount) {
                try {
                    unpaused.await();
                } catch (InterruptedException e) {
                    t.interrupt();
                }
            }
            currentTaskCount++;
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * Signalling that one more task is welcome
     */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        super.afterExecute(r, t);
        taskLock.lock();
        try {
            currentTaskCount--;
            unpaused.signalAll();
        } finally {
            taskLock.unlock();
        }
    }

    /**
     * @Description: 判断任务是否处理完成,如果完成,返回为true
     * @return   
     * @return boolean  
     * @throws
     * @author JornTang
     * @date 2017年12月26日
     */
    public static boolean isEndTask() {
        while(true){
            if(threadPool.getActiveCount() == 0){
                return true;
            }
        }
    }
}
 

